AWS Bedrock Knowledge Base

AWS Bedrock Agent Example: Sales Data Analysis

This guide demonstrates how to build an AI agent using AWS Bedrock that can analyze sales data and create reports, based on the agent example from the LLM guide.

Table of Contents

Overview

We'll implement an agent that can: 1. Read sales data from S3 2. Analyze the data using Python code execution 3. Generate insights and create a formatted report 4. Save the report back to S3

Task Example: "Analyze the sales data from last month and create a report"

Architecture

User Request
    ↓
Agent Orchestrator (Lambda or local)
    ↓
┌─────────────────────────────────────┐
│  Amazon Bedrock (Claude/Titan)      │
│  - Understands request              │
│  - Plans actions                    │
│  - Decides what tools to use        │
└─────────────────────────────────────┘
    ↓
Action Groups / Tools
├── read_s3_file (Lambda)
├── execute_python (Lambda)
└── write_s3_file (Lambda)
    ↓
Results fed back to Bedrock
    ↓
Final Response to User

Prerequisites

# AWS CLI configured
aws configure

# Python dependencies
pip install boto3 anthropic langchain-aws

# Required AWS services
# - Amazon Bedrock (Claude 3 or Titan)
# - AWS Lambda (for action groups)
# - Amazon S3 (for data storage)
# - IAM roles with appropriate permissions

Implementation Options

Option 1: Bedrock Agents (Managed)

AWS Bedrock Agents provides a fully managed agent framework.

Step 1: Create Action Groups (Lambda Functions)

Lambda: reads3file

import boto3
import json

s3 = boto3.client('s3')

def lambda_handler(event, context):
    """Read file from S3"""

    # Parse input from Bedrock Agent
    parameters = event.get('parameters', [])
    bucket = next((p['value'] for p in parameters if p['name'] == 'bucket'), None)
    key = next((p['value'] for p in parameters if p['name'] == 'key'), None)

    try:
        response = s3.get_object(Bucket=bucket, Key=key)
        content = response['Body'].read().decode('utf-8')

        return {
            'messageVersion': '1.0',
            'response': {
                'actionGroup': event['actionGroup'],
                'apiPath': event['apiPath'],
                'httpMethod': event['httpMethod'],
                'httpStatusCode': 200,
                'responseBody': {
                    'application/json': {
                        'body': json.dumps({
                            'success': True,
                            'content': content,
                            'size': len(content)
                        })
                    }
                }
            }
        }
    except Exception as e:
        return {
            'messageVersion': '1.0',
            'response': {
                'actionGroup': event['actionGroup'],
                'apiPath': event['apiPath'],
                'httpMethod': event['httpMethod'],
                'httpStatusCode': 500,
                'responseBody': {
                    'application/json': {
                        'body': json.dumps({
                            'success': False,
                            'error': str(e)
                        })
                    }
                }
            }
        }

Lambda: execute_python

import json
import sys
from io import StringIO
import pandas as pd
import numpy as np

def lambda_handler(event, context):
    """Execute Python code safely"""

    parameters = event.get('parameters', [])
    code = next((p['value'] for p in parameters if p['name'] == 'code'), None)
    data = next((p['value'] for p in parameters if p['name'] == 'data'), None)

    try:
        # Create a restricted execution environment
        local_vars = {
            'pd': pd,
            'np': np,
            'data': data
        }

        # Capture stdout
        old_stdout = sys.stdout
        sys.stdout = captured_output = StringIO()

        # Execute code
        exec(code, {"__builtins__": {}}, local_vars)

        # Get output
        sys.stdout = old_stdout
        output = captured_output.getvalue()

        # Get result variable if exists
        result = local_vars.get('result', output)

        return {
            'messageVersion': '1.0',
            'response': {
                'actionGroup': event['actionGroup'],
                'apiPath': event['apiPath'],
                'httpMethod': event['httpMethod'],
                'httpStatusCode': 200,
                'responseBody': {
                    'application/json': {
                        'body': json.dumps({
                            'success': True,
                            'result': str(result),
                            'output': output
                        })
                    }
                }
            }
        }
    except Exception as e:
        return {
            'messageVersion': '1.0',
            'response': {
                'actionGroup': event['actionGroup'],
                'apiPath': event['apiPath'],
                'httpMethod': event['httpMethod'],
                'httpStatusCode': 500,
                'responseBody': {
                    'application/json': {
                        'body': json.dumps({
                            'success': False,
                            'error': str(e)
                        })
                    }
                }
            }
        }

Lambda: writes3file

import boto3
import json

s3 = boto3.client('s3')

def lambda_handler(event, context):
    """Write file to S3"""

    parameters = event.get('parameters', [])
    bucket = next((p['value'] for p in parameters if p['name'] == 'bucket'), None)
    key = next((p['value'] for p in parameters if p['name'] == 'key'), None)
    content = next((p['value'] for p in parameters if p['name'] == 'content'), None)

    try:
        s3.put_object(
            Bucket=bucket,
            Key=key,
            Body=content.encode('utf-8'),
            ContentType='text/markdown'
        )

        return {
            'messageVersion': '1.0',
            'response': {
                'actionGroup': event['actionGroup'],
                'apiPath': event['apiPath'],
                'httpMethod': event['httpMethod'],
                'httpStatusCode': 200,
                'responseBody': {
                    'application/json': {
                        'body': json.dumps({
                            'success': True,
                            'message': f'File written to s3://{bucket}/{key}'
                        })
                    }
                }
            }
        }
    except Exception as e:
        return {
            'messageVersion': '1.0',
            'response': {
                'actionGroup': event['actionGroup'],
                'apiPath': event['apiPath'],
                'httpMethod': event['httpMethod'],
                'httpStatusCode': 500,
                'responseBody': {
                    'application/json': {
                        'body': json.dumps({
                            'success': False,
                            'error': str(e)
                        })
                    }
                }
            }
        }

Step 2: Create OpenAPI Schema for Action Groups

action-group-schema.json

{
  "openapi": "3.0.0",
  "info": {
    "title": "Sales Analysis Tools",
    "version": "1.0.0",
    "description": "Tools for reading, analyzing, and writing sales data"
  },
  "paths": {
    "/read-file": {
      "post": {
        "summary": "Read a file from S3",
        "description": "Reads the content of a file stored in Amazon S3",
        "operationId": "readS3File",
        "requestBody": {
          "required": true,
          "content": {
            "application/json": {
              "schema": {
                "type": "object",
                "properties": {
                  "bucket": {
                    "type": "string",
                    "description": "S3 bucket name"
                  },
                  "key": {
                    "type": "string",
                    "description": "S3 object key (file path)"
                  }
                },
                "required": ["bucket", "key"]
              }
            }
          }
        },
        "responses": {
          "200": {
            "description": "File content retrieved successfully",
            "content": {
              "application/json": {
                "schema": {
                  "type": "object",
                  "properties": {
                    "success": { "type": "boolean" },
                    "content": { "type": "string" },
                    "size": { "type": "integer" }
                  }
                }
              }
            }
          }
        }
      }
    },
    "/execute-python": {
      "post": {
        "summary": "Execute Python code for data analysis",
        "description": "Executes Python code with pandas and numpy available",
        "operationId": "executePython",
        "requestBody": {
          "required": true,
          "content": {
            "application/json": {
              "schema": {
                "type": "object",
                "properties": {
                  "code": {
                    "type": "string",
                    "description": "Python code to execute"
                  },
                  "data": {
                    "type": "string",
                    "description": "Input data (CSV or JSON string)"
                  }
                },
                "required": ["code"]
              }
            }
          }
        },
        "responses": {
          "200": {
            "description": "Code executed successfully",
            "content": {
              "application/json": {
                "schema": {
                  "type": "object",
                  "properties": {
                    "success": { "type": "boolean" },
                    "result": { "type": "string" },
                    "output": { "type": "string" }
                  }
                }
              }
            }
          }
        }
      }
    },
    "/write-file": {
      "post": {
        "summary": "Write a file to S3",
        "description": "Writes content to a file in Amazon S3",
        "operationId": "writeS3File",
        "requestBody": {
          "required": true,
          "content": {
            "application/json": {
              "schema": {
                "type": "object",
                "properties": {
                  "bucket": {
                    "type": "string",
                    "description": "S3 bucket name"
                  },
                  "key": {
                    "type": "string",
                    "description": "S3 object key (file path)"
                  },
                  "content": {
                    "type": "string",
                    "description": "File content to write"
                  }
                },
                "required": ["bucket", "key", "content"]
              }
            }
          }
        },
        "responses": {
          "200": {
            "description": "File written successfully",
            "content": {
              "application/json": {
                "schema": {
                  "type": "object",
                  "properties": {
                    "success": { "type": "boolean" },
                    "message": { "type": "string" }
                  }
                }
              }
            }
          }
        }
      }
    }
  }
}

Step 3: Create Bedrock Agent via AWS Console or CLI

Using AWS CLI:

# Create agent
aws bedrock-agent create-agent \
  --agent-name sales-analysis-agent \
  --foundation-model anthropic.claude-3-sonnet-20240229-v1:0 \
  --instruction "You are a sales data analyst. When asked to analyze sales data:
1. Read the data file from S3
2. Use Python to analyze the data (calculate totals, trends, insights)
3. Create a formatted markdown report with your findings
4. Save the report to S3
Always explain your reasoning at each step." \
  --agent-resource-role-arn arn:aws:iam::ACCOUNT_ID:role/BedrockAgentRole

# Create action group
aws bedrock-agent create-agent-action-group \
  --agent-id AGENT_ID \
  --agent-version DRAFT \
  --action-group-name sales-tools \
  --action-group-executor lambda=arn:aws:lambda:REGION:ACCOUNT_ID:function:read_s3_file \
  --api-schema file://action-group-schema.json

# Prepare agent
aws bedrock-agent prepare-agent --agent-id AGENT_ID

# Create alias
aws bedrock-agent create-agent-alias \
  --agent-id AGENT_ID \
  --agent-alias-name production

Step 4: Invoke the Agent

import boto3
import json

bedrock_agent_runtime = boto3.client('bedrock-agent-runtime')

def invoke_agent(agent_id, agent_alias_id, session_id, prompt):
    """Invoke Bedrock Agent"""

    response = bedrock_agent_runtime.invoke_agent(
        agentId=agent_id,
        agentAliasId=agent_alias_id,
        sessionId=session_id,
        inputText=prompt
    )

    # Stream the response
    completion = ""
    for event in response.get('completion', []):
        chunk = event.get('chunk')
        if chunk:
            completion += chunk.get('bytes').decode()

    return completion

# Example usage
result = invoke_agent(
    agent_id='YOUR_AGENT_ID',
    agent_alias_id='YOUR_ALIAS_ID',
    session_id='session-123',
    prompt='Analyze the sales data from s3://my-bucket/sales_2024_12.csv and create a report'
)

print(result)

Option 2: Agent Core with Bedrock (Custom)

Build a custom agent orchestrator using Python and Bedrock's Converse API.

Complete Agent Implementation

bedrock_agent.py

import boto3
import json
import re
from typing import List, Dict, Any, Callable

class BedrockAgent:
    """Custom agent using Amazon Bedrock"""

    def __init__(self, model_id: str = "anthropic.claude-3-sonnet-20240229-v1:0"):
        self.bedrock = boto3.client('bedrock-runtime')
        self.s3 = boto3.client('s3')
        self.model_id = model_id
        self.conversation_history = []
        self.max_iterations = 10

        # Register available tools
        self.tools = {
            'read_s3_file': self.read_s3_file,
            'execute_python': self.execute_python,
            'write_s3_file': self.write_s3_file
        }

    def get_system_prompt(self) -> str:
        """System instructions for the agent"""
        return """You are a helpful AI assistant with access to tools for data analysis.

When given a task, follow this pattern:
1. Think about what you need to do
2. Use available tools to gather information or perform actions
3. Analyze the results
4. Continue until you have completed the task

Available tools:
- read_s3_file(bucket, key): Read a file from S3
- execute_python(code, data): Execute Python code for analysis (pandas and numpy available)
- write_s3_file(bucket, key, content): Write content to S3

Always explain your reasoning before using a tool.

To use a tool, format your response as:
THOUGHT: [your reasoning]
ACTION: tool_name(param1="value1", param2="value2")

When you have completed the task, respond with:
FINAL_ANSWER: [your response to the user]"""

    def read_s3_file(self, bucket: str, key: str) -> Dict[str, Any]:
        """Tool: Read file from S3"""
        try:
            response = self.s3.get_object(Bucket=bucket, Key=key)
            content = response['Body'].read().decode('utf-8')
            return {
                'success': True,
                'content': content,
                'size': len(content)
            }
        except Exception as e:
            return {
                'success': False,
                'error': str(e)
            }

    def execute_python(self, code: str, data: str = None) -> Dict[str, Any]:
        """Tool: Execute Python code (simplified - use Lambda for production)"""
        try:
            import pandas as pd
            import numpy as np
            from io import StringIO

            # Create execution environment
            local_vars = {
                'pd': pd,
                'np': np,
                'StringIO': StringIO
            }

            if data:
                local_vars['data'] = data

            # Execute code
            exec(code, {"__builtins__": __builtins__}, local_vars)

            # Get result
            result = local_vars.get('result', 'Code executed successfully')

            return {
                'success': True,
                'result': str(result)
            }
        except Exception as e:
            return {
                'success': False,
                'error': str(e)
            }

    def write_s3_file(self, bucket: str, key: str, content: str) -> Dict[str, Any]:
        """Tool: Write file to S3"""
        try:
            self.s3.put_object(
                Bucket=bucket,
                Key=key,
                Body=content.encode('utf-8'),
                ContentType='text/markdown'
            )
            return {
                'success': True,
                'message': f'File written to s3://{bucket}/{key}'
            }
        except Exception as e:
            return {
                'success': False,
                'error': str(e)
            }

    def parse_action(self, text: str) -> tuple:
        """Parse ACTION from LLM response"""
        action_pattern = r'ACTION:\s*(\w+)\((.*?)\)'
        match = re.search(action_pattern, text, re.DOTALL)

        if not match:
            return None, None

        tool_name = match.group(1)
        params_str = match.group(2)

        # Parse parameters (simplified)
        params = {}
        param_pattern = r'(\w+)=["\'](.*?)["\']'
        for param_match in re.finditer(param_pattern, params_str):
            params[param_match.group(1)] = param_match.group(2)

        return tool_name, params

    def call_bedrock(self, messages: List[Dict]) -> str:
        """Call Bedrock Converse API"""
        response = self.bedrock.converse(
            modelId=self.model_id,
            messages=messages,
            system=[{"text": self.get_system_prompt()}],
            inferenceConfig={
                "temperature": 0.7,
                "maxTokens": 2048
            }
        )

        return response['output']['message']['content'][0]['text']

    def run(self, user_prompt: str) -> str:
        """Main agent loop"""

        # Initialize conversation
        self.conversation_history = [
            {
                "role": "user",
                "content": [{"text": user_prompt}]
            }
        ]

        iteration = 0

        while iteration < self.max_iterations:
            iteration += 1
            print(f"\n--- Iteration {iteration} ---")

            # Get LLM response
            llm_response = self.call_bedrock(self.conversation_history)
            print(f"LLM Response:\n{llm_response}\n")

            # Add to history
            self.conversation_history.append({
                "role": "assistant",
                "content": [{"text": llm_response}]
            })

            # Check for final answer
            if "FINAL_ANSWER:" in llm_response:
                final_answer = llm_response.split("FINAL_ANSWER:")[1].strip()
                return final_answer

            # Parse and execute action
            tool_name, params = self.parse_action(llm_response)

            if tool_name and tool_name in self.tools:
                print(f"Executing: {tool_name}({params})")

                # Execute tool
                tool_result = self.tools[tool_name](**params)
                print(f"Tool Result: {tool_result}\n")

                # Add result to conversation
                observation = f"OBSERVATION: {json.dumps(tool_result, indent=2)}"
                self.conversation_history.append({
                    "role": "user",
                    "content": [{"text": observation}]
                })
            else:
                # No valid action found, ask LLM to continue
                self.conversation_history.append({
                    "role": "user",
                    "content": [{"text": "Please continue with your next action or provide the final answer."}]
                })

        return "Max iterations reached. Task may be incomplete."


# Example usage
if __name__ == "__main__":
    agent = BedrockAgent()

    result = agent.run(
        "Analyze the sales data from s3://my-sales-bucket/sales_2024_12.csv "
        "and create a report saved to s3://my-sales-bucket/reports/december_report.md"
    )

    print("\n=== FINAL RESULT ===")
    print(result)

Example Agent Execution Flow

User: "Analyze the sales data from last month and create a report"

--- Iteration 1 ---
LLM Response:
THOUGHT: I need to first read the sales data file from S3 to see what we're working with.
ACTION: read_s3_file(bucket="my-sales-bucket", key="sales_2024_12.csv")

Executing: read_s3_file({'bucket': 'my-sales-bucket', 'key': 'sales_2024_12.csv'})
Tool Result: {
  "success": true,
  "content": "date,product,quantity,revenue\n2024-12-01,Widget A,50,5000\n...",
  "size": 2048
}

--- Iteration 2 ---
LLM Response:
THOUGHT: Now I have the data. I'll use Python to analyze it and calculate key metrics.
ACTION: execute_python(code="import pandas as pd\nfrom io import StringIO\ndf = pd.read_csv(StringIO(data))\nresult = {'total_revenue': df['revenue'].sum(), 'total_quantity': df['quantity'].sum(), 'avg_order': df['revenue'].mean()}", data="date,product,quantity,revenue\n...")

Executing: execute_python(...)
Tool Result: {
  "success": true,
  "result": "{'total_revenue': 125000, 'total_quantity': 2500, 'avg_order': 4166.67}"
}

--- Iteration 3 ---
LLM Response:
THOUGHT: I have the analysis results. Now I'll create a formatted markdown report.
ACTION: write_s3_file(bucket="my-sales-bucket", key="reports/december_report.md", content="# Sales Report - December 2024\n\n## Summary\n- Total Revenue: $125,000\n- Total Units Sold: 2,500\n- Average Order Value: $4,166.67\n...")

Executing: write_s3_file(...)
Tool Result: {
  "success": true,
  "message": "File written to s3://my-sales-bucket/reports/december_report.md"
}

--- Iteration 4 ---
LLM Response:
FINAL_ANSWER: I've successfully analyzed the December 2024 sales data and created a comprehensive report. The analysis shows total revenue of $125,000 from 2,500 units sold, with an average order value of $4,166.67. The full report has been saved to s3://my-sales-bucket/reports/december_report.md

Testing the Agent

Test Script

import boto3
from bedrock_agent import BedrockAgent

def test_agent():
    """Test the Bedrock agent"""

    agent = BedrockAgent()

    # Test cases
    test_cases = [
        "Read the file s3://my-bucket/test.txt",
        "Calculate the sum of numbers 1 to 100 using Python",
        "Analyze sales data from s3://sales-bucket/data.csv and create a summary report"
    ]

    for i, test in enumerate(test_cases, 1):
        print(f"\n{'='*60}")
        print(f"Test Case {i}: {test}")
        print('='*60)

        result = agent.run(test)
        print(f"\nResult: {result}")

if __name__ == "__main__":
    test_agent()

Sample Data for Testing

sales202412.csv

date,product,category,quantity,unit_price,revenue
2024-12-01,Widget A,Electronics,50,100,5000
2024-12-01,Widget B,Electronics,30,150,4500
2024-12-02,Gadget X,Home,75,80,6000
2024-12-02,Gadget Y,Home,40,120,4800
2024-12-03,Widget A,Electronics,60,100,6000
2024-12-03,Tool Z,Industrial,25,200,5000

Cost Considerations

Bedrock Pricing (Example for Claude 3 Sonnet)

Input tokens: $0.003 per 1K tokens Output tokens: $0.015 per 1K tokens

Example agent execution:

Iteration 1: 500 input + 200 output tokens
Iteration 2: 800 input + 300 output tokens
Iteration 3: 1000 input + 400 output tokens
Iteration 4: 1200 input + 150 output tokens

Total: 3,500 input tokens + 1,050 output tokens
Cost: (3.5 × $0.003) + (1.05 × $0.015) = $0.0105 + $0.0158 = $0.0263

Cost Optimization Tips

  1. Use smaller models for simple tasks

    • Claude 3 Haiku for basic operations
    • Claude 3 Sonnet for balanced performance
    • Claude 3 Opus only for complex reasoning
  2. Implement caching

    • Cache system prompts and tool definitions
    • Reuse conversation context
  3. Limit iterations

    • Set max_iterations to prevent runaway costs
    • Use more specific prompts
  4. Batch operations

    • Process multiple files in one agent run
    • Combine related tasks

Advanced Features

Adding Memory to the Agent

class BedrockAgentWithMemory(BedrockAgent):
    """Agent with persistent memory"""

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.dynamodb = boto3.resource('dynamodb')
        self.memory_table = self.dynamodb.Table('agent-memory')

    def save_memory(self, session_id: str, key: str, value: Any):
        """Save to long-term memory"""
        self.memory_table.put_item(
            Item={
                'session_id': session_id,
                'key': key,
                'value': json.dumps(value),
                'timestamp': int(time.time())
            }
        )

    def retrieve_memory(self, session_id: str, key: str) -> Any:
        """Retrieve from long-term memory"""
        response = self.memory_table.get_item(
            Key={'session_id': session_id, 'key': key}
        )
        if 'Item' in response:
            return json.loads(response['Item']['value'])
        return None

Adding Knowledge Base Integration

def add_knowledge_base_tool(self):
    """Add Bedrock Knowledge Base as a tool"""

    def search_knowledge_base(query: str) -> Dict[str, Any]:
        bedrock_agent = boto3.client('bedrock-agent-runtime')

        response = bedrock_agent.retrieve(
            knowledgeBaseId='YOUR_KB_ID',
            retrievalQuery={'text': query}
        )

        results = []
        for result in response['retrievalResults']:
            results.append({
                'content': result['content']['text'],
                'score': result['score']
            })

        return {'success': True, 'results': results}

    self.tools['search_knowledge_base'] = search_knowledge_base

Summary

This guide demonstrated two approaches to building AI agents with AWS Bedrock:

  1. Managed Bedrock Agents: Fully managed, integrated with Lambda action groups
  2. Custom Agent Core: Full control, custom orchestration logic

Both approaches follow the ReAct pattern (Reasoning + Acting) and enable: - Multi-step task execution - Tool/function calling - Iterative problem solving - Integration with AWS services

Choose Bedrock Agents for quick deployment and managed infrastructure, or build custom agents for maximum flexibility and control.